package drds.data_propagate.parse.table_meta_data;

import com.alibaba.fastsql.sql.ast.SQLDataType;
import com.alibaba.fastsql.sql.ast.SQLDataTypeImpl;
import com.alibaba.fastsql.sql.ast.SQLExpr;
import com.alibaba.fastsql.sql.ast.SQLStatement;
import com.alibaba.fastsql.sql.ast.expr.*;
import com.alibaba.fastsql.sql.ast.statement.*;
import com.alibaba.fastsql.sql.dialect.mysql.ast.MySqlPrimaryKey;
import com.alibaba.fastsql.sql.dialect.mysql.ast.MySqlUnique;
import com.alibaba.fastsql.sql.dialect.mysql.ast.expr.MySqlOrderingExpr;
import com.alibaba.fastsql.sql.repository.Schema;
import com.alibaba.fastsql.sql.repository.SchemaObject;
import com.alibaba.fastsql.sql.repository.SchemaRepository;
import com.alibaba.fastsql.util.JdbcConstants;
import drds.data_propagate.parse.table_meta_data.ddl.DruidDdlParser;
import drds.propagate.protocol.position.EntryPosition;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 基于DDL维护的内存表结构
 */
public class MemoryTableMetaDataStore implements TableMetaDataStore {

    private Logger logger = LoggerFactory.getLogger(MemoryTableMetaDataStore.class);
    private Map<List<String>, TableMetaData> schemaNameAndTableNameToTableMetaDataMap = new ConcurrentHashMap<List<String>, TableMetaData>();
    private SchemaRepository schemaRepository = new SchemaRepository(JdbcConstants.MYSQL);

    public MemoryTableMetaDataStore() {
    }

    @Override
    public boolean init(String destination) {
        return true;
    }

    @Override
    public void destory() {
        schemaNameAndTableNameToTableMetaDataMap.clear();
    }

    public boolean apply(EntryPosition entryPosition, String schemaName, String ddl, String extra) {
        schemaNameAndTableNameToTableMetaDataMap.clear();
        synchronized (this) {
            if (StringUtils.isNotEmpty(schemaName)) {
                schemaRepository.setDefaultSchema(schemaName);
            }

            try {
                // druid暂时flush privileges语法解析有问题
                if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")
                        && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")
                        && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create user")
                        && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop user")) {
                    schemaRepository.console(ddl);
                }
            } catch (Throwable e) {
                logger.warn("parse faield : " + ddl, e);
            }
        }

        // TableMetaData meta = find("tddl5_00", "ab");
        // if (meta != null) {
        // schemaRepository.setDefaultSchema("tddl5_00");
        // System.out.println(schemaRepository.console("show create table tddl5_00.ab"));
        // System.out.println(schemaRepository.console("show columns from tddl5_00.ab"));
        // }
        return true;
    }

    @Override
    public TableMetaData find(String schemaName, String tableName) {
        List<String> schemaNameAndTableName = Arrays.asList(schemaName, tableName);
        TableMetaData tableMetaData = schemaNameAndTableNameToTableMetaDataMap.get(schemaNameAndTableName);
        if (tableMetaData == null) {
            synchronized (this) {
                tableMetaData = schemaNameAndTableNameToTableMetaDataMap.get(schemaNameAndTableName);
                if (tableMetaData == null) {
                    Schema schema = schemaRepository.findSchema(schemaName);
                    if (schema == null) {
                        return null;
                    }
                    SchemaObject schemaObject = schema.findTable(tableName);
                    if (schemaObject == null) {
                        return null;
                    }
                    SQLStatement statement = schemaObject.getStatement();
                    if (statement == null) {
                        return null;
                    }
                    if (statement instanceof SQLCreateTableStatement) {
                        tableMetaData = parse((SQLCreateTableStatement) statement);
                    }
                    if (tableMetaData != null) {
                        if (tableName != null) {
                            tableMetaData.setTableName(tableName);
                        }
                        if (schemaName != null) {
                            tableMetaData.setSchemaName(schemaName);
                        }

                        schemaNameAndTableNameToTableMetaDataMap.put(schemaNameAndTableName, tableMetaData);
                    }
                }
            }
        }

        return tableMetaData;
    }

    @Override
    public boolean rollback(EntryPosition entryPosition) {
        throw new RuntimeException("not authentication_info for memory");
    }

    public Map<String, String> snapshot() {
        Map<String, String> schemaDdls = new HashMap<String, String>();
        for (Schema schema : schemaRepository.getSchemas()) {
            StringBuffer data = new StringBuffer(4 * 1024);
            for (String table : schema.showTables()) {
                SchemaObject schemaObject = schema.findTable(table);
                schemaObject.getStatement().output(data);
                data.append("; \n");
            }
            schemaDdls.put(schema.getName(), data.toString());
        }

        return schemaDdls;
    }

    private TableMetaData parse(SQLCreateTableStatement sqlCreateTableStatement) {
        int size = sqlCreateTableStatement.getTableElementList().size();
        if (size > 0) {
            TableMetaData tableMetaData = new TableMetaData();
            for (int i = 0; i < size; ++i) {
                SQLTableElement sqlTableElement = sqlCreateTableStatement.getTableElementList().get(i);
                processTableElement(sqlTableElement, tableMetaData);
            }
            return tableMetaData;
        }

        return null;
    }

    private void processTableElement(SQLTableElement sqlTableElement, TableMetaData tableMetaData) {
        if (sqlTableElement instanceof SQLColumnDefinition) {
            ColumnMetaData columnMetaData = new ColumnMetaData();
            SQLColumnDefinition sqlColumnDefinition = (SQLColumnDefinition) sqlTableElement;
            String name = getSqlName(sqlColumnDefinition.getName());
            // String charset = getSqlName(column.getCharsetExpr());
            SQLDataType sqlDataType = sqlColumnDefinition.getDataType();
            String dataTypStr = sqlDataType.getName();
            if (sqlDataType.getArguments().size() > 0) {
                dataTypStr += "(";
                for (int i = 0; i < sqlColumnDefinition.getDataType().getArguments().size(); i++) {
                    if (i != 0) {
                        dataTypStr += ",";
                    }
                    SQLExpr arg = sqlColumnDefinition.getDataType().getArguments().get(i);
                    dataTypStr += arg.toString();
                }
                dataTypStr += ")";
            }

            if (sqlDataType instanceof SQLDataTypeImpl) {
                SQLDataTypeImpl dataTypeImpl = (SQLDataTypeImpl) sqlDataType;
                if (dataTypeImpl.isUnsigned()) {
                    dataTypStr += " unsigned";
                }

                if (dataTypeImpl.isZerofill()) {
                    dataTypStr += " zerofill";
                }
            }

            if (sqlColumnDefinition.getDefaultExpr() == null || sqlColumnDefinition.getDefaultExpr() instanceof SQLNullExpr) {
                columnMetaData.setDefaultValue(null);
            } else {
                columnMetaData.setDefaultValue(DruidDdlParser.unescapeQuotaName(getSqlName(sqlColumnDefinition.getDefaultExpr())));
            }

            columnMetaData.setColumnName(name);
            columnMetaData.setColumnType(dataTypStr);
            columnMetaData.setNullable(true);
            List<SQLColumnConstraint> sqlColumnConstraintList = sqlColumnDefinition.getConstraints();
            for (SQLColumnConstraint sqlColumnConstraint : sqlColumnConstraintList) {
                if (sqlColumnConstraint instanceof SQLNotNullConstraint) {
                    columnMetaData.setNullable(false);
                } else if (sqlColumnConstraint instanceof SQLNullConstraint) {
                    columnMetaData.setNullable(true);
                } else if (sqlColumnConstraint instanceof SQLColumnPrimaryKey) {
                    columnMetaData.setKey(true);
                    columnMetaData.setNullable(false);
                } else if (sqlColumnConstraint instanceof SQLColumnUniqueKey) {
                    columnMetaData.setUnique(true);
                }
            }
            tableMetaData.addFieldMeta(columnMetaData);
        } else if (sqlTableElement instanceof MySqlPrimaryKey) {
            MySqlPrimaryKey mySqlPrimaryKey = (MySqlPrimaryKey) sqlTableElement;
            List<SQLSelectOrderByItem> sqlSelectOrderByItemList = mySqlPrimaryKey.getColumns();
            for (SQLSelectOrderByItem sqlSelectOrderByItem : sqlSelectOrderByItemList) {
                String name = getSqlName(sqlSelectOrderByItem.getExpr());
                ColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(name);
                columnMetaData.setKey(true);
                columnMetaData.setNullable(false);
            }
        } else if (sqlTableElement instanceof MySqlUnique) {
            MySqlUnique mySqlUnique = (MySqlUnique) sqlTableElement;
            List<SQLSelectOrderByItem> sqlSelectOrderByItemList = mySqlUnique.getColumns();
            for (SQLSelectOrderByItem sqlSelectOrderByItem : sqlSelectOrderByItemList) {
                String name = getSqlName(sqlSelectOrderByItem.getExpr());
                ColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(name);
                columnMetaData.setUnique(true);
            }
        }
    }

    private String getSqlName(SQLExpr sqlExpr) {
        if (sqlExpr == null) {
            return null;
        }

        if (sqlExpr instanceof SQLPropertyExpr) {
            SQLIdentifierExpr owner = (SQLIdentifierExpr) ((SQLPropertyExpr) sqlExpr).getOwner();
            return DruidDdlParser.unescapeName(owner.getName()) + "."
                    + DruidDdlParser.unescapeName(((SQLPropertyExpr) sqlExpr).getName());
        } else if (sqlExpr instanceof SQLIdentifierExpr) {
            return DruidDdlParser.unescapeName(((SQLIdentifierExpr) sqlExpr).getName());
        } else if (sqlExpr instanceof SQLCharExpr) {
            return ((SQLCharExpr) sqlExpr).getText();
        } else if (sqlExpr instanceof SQLMethodInvokeExpr) {
            return DruidDdlParser.unescapeName(((SQLMethodInvokeExpr) sqlExpr).getMethodName());
        } else if (sqlExpr instanceof MySqlOrderingExpr) {
            return getSqlName(((MySqlOrderingExpr) sqlExpr).getExpr());
        } else {
            return sqlExpr.toString();
        }
    }

    public SchemaRepository getSchemaRepository() {
        return schemaRepository;
    }

}
